{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/bro.png\" width=\"100px\"></div>\n",
    "\n",
    "# Zeek to Spark\n",
    "In this notebook will show how easy it is to load up really big Zeek/Zeek logs by using the classes within the Zeek Analysis Tools. We'll also show converting a Zeek log into a Parquet file in one line of code. \n",
    "\n",
    "<div style=\"float: right; margin: 30px 0px 0px 0px\"><img src=\"images/spark.png\" width=\"200px\"></div>\n",
    "\n",
    "### Software\n",
    "- Zeek Analysis Tools (ZAT): https://github.com/SuperCowPowers/zat\n",
    "- Parquet: https://parquet.apache.org\n",
    "- Spark: https://spark.apache.org\n",
    "\n",
    "### Data\n",
    "- HTTP log with ~2 million rows, to show speed/simplicity and simple Spark processing\n",
    "- Grab the Data: https://data.kitware.com/#collection/58d564478d777f0aef5d893a\n",
    "\n",
    "<div style=\"float: right; margin: -80px 0px 0px 0px\"><img src=\"images/parquet.png\" width=\"250px\"></div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ZAT: 0.3.7\n",
      "PySpark: 2.4.4\n"
     ]
    }
   ],
   "source": [
    "# Third Party Imports\n",
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "# Local imports\n",
    "import zat\n",
    "from zat import log_to_sparkdf\n",
    "\n",
    "# Good to print out versions of stuff\n",
    "print('ZAT: {:s}'.format(zat.__version__))\n",
    "print('PySpark: {:s}'.format(pyspark.__version__))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/spark.png\" width=\"200px\"></div>\n",
    "\n",
    "# Spark It!\n",
    "### Spin up Spark with 4 Parallel Executors\n",
    "Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:\n",
    "\n",
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/spark_jobs.png\" width=\"400px\"></div>\n",
    "\n",
    "- If you have 4/8 cores use them!\n",
    "- It's the exact same code logic as if we were running on a distributed cluster.\n",
    "- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Spin up a local Spark Session (with 4 executors)\n",
    "spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use the ZAT class to load our log file into a Spark dataframe (2 lines of code!)\n",
    "spark_it = log_to_sparkdf.LogToSparkDF(spark)\n",
    "spark_df = spark_it.create_dataframe('/Users/briford/data/bro/http.log')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 0px 0px 0px -80px\"><img src=\"images/spark_distributed.png\" width=\"500px\"></div>\n",
    "\n",
    "# Spark Workers and Data Partitions\n",
    "Spark will read in and partition the data out to our workers. Our dataframe(rdd) will have some number of partitions that are divided up amongst the worker pool. Each worker will operate on only a subset of the data and Spark will manage the 'magic' for how that work gets run, aggregated and presented.\n",
    "\n",
    "\n",
    "**Image Credit:** Jacek Laskowski, please see his excellent book - Mastering Apache Spark  https://jaceklaskowski.gitbooks.io/mastering-apache-spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "11"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark_df.rdd.getNumPartitions()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: left; margin: 20px 20px 20px 20px\"><img src=\"images/nuked_crop.jpg\" width=\"150px\"></div>\n",
    "\n",
    "\n",
    "# Light it Up!\n",
    "Here we're going to demonstrate just a few simple Spark operations but obviously you now have the full power of the Death Star in your hands.\n",
    "\n",
    "<div style=\"float: left; margin: 0px 0px 0px 50px\"><img src=\"images/spark_sql.jpg\" width=\"150px\"></div>\n",
    "<div style=\"float: left; margin: -20px 50px 0px 0px\"><img src=\"images/mllib.png\" width=\"150px\"></div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of Rows: 2048442\n",
      "Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,trans_depth,method,host,uri,referrer,user_agent,request_body_len,response_body_len,status_code,status_msg,info_code,info_msg,filename,tags,username,password,proxied,orig_fuids,orig_mime_types,resp_fuids,resp_mime_types\n"
     ]
    }
   ],
   "source": [
    "# Get information about the Spark DataFrame\n",
    "num_rows = spark_df.count()\n",
    "print(\"Number of Rows: {:d}\".format(num_rows))\n",
    "columns = spark_df.columns\n",
    "print(\"Columns: {:s}\".format(','.join(columns)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------+---------------+--------------+-----------+--------------------+\n",
      "|     id_orig_h|           host|           uri|status_code|          user_agent|\n",
      "+--------------+---------------+--------------+-----------+--------------------+\n",
      "|192.168.202.79|192.168.229.251|/DEASLog02.nsf|        404|Mozilla/5.0 (comp...|\n",
      "|192.168.202.79|192.168.229.251|/DEASLog03.nsf|        404|Mozilla/5.0 (comp...|\n",
      "|192.168.202.79|192.168.229.251|/DEASLog04.nsf|        404|Mozilla/5.0 (comp...|\n",
      "|192.168.202.79|192.168.229.251|/DEASLog05.nsf|        404|Mozilla/5.0 (comp...|\n",
      "|192.168.202.79|192.168.229.251|  /DEASLog.nsf|        404|Mozilla/5.0 (comp...|\n",
      "+--------------+---------------+--------------+-----------+--------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark_df.select(['id_orig_h', 'host', 'uri', 'status_code', 'user_agent']).show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------+-------+\n",
      "| method|status_code|  count|\n",
      "+-------+-----------+-------+\n",
      "|   HEAD|        404|1294022|\n",
      "|    GET|        404| 429283|\n",
      "|   POST|        200| 125638|\n",
      "|    GET|        200|  88631|\n",
      "|   POST|          0|  32918|\n",
      "|    GET|        400|  29152|\n",
      "|    GET|        303|  10858|\n",
      "|    GET|        403|   8530|\n",
      "|   POST|        404|   4277|\n",
      "|    GET|        304|   3851|\n",
      "|    GET|        302|   3250|\n",
      "|    GET|          0|   2906|\n",
      "|    GET|        401|   2159|\n",
      "|OPTIONS|        200|   1897|\n",
      "|   POST|        302|   1226|\n",
      "|   HEAD|        503|   1010|\n",
      "|   POST|        206|    869|\n",
      "|    GET|        301|    642|\n",
      "|   HEAD|          0|    606|\n",
      "|    GET|        503|    550|\n",
      "+-------+-----------+-------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 30px 0px 0px 0px\"><img src=\"images/parquet.png\" width=\"400px\"></div>\n",
    "\n",
    "# What about Parquet files?\n",
    "Apache Parquet is a columnar storage format focused on performance. Parquet data is often used within the Hadoop ecosystem and converting your Zeek/Zeek log to a Parquet file is one line of code!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "# DataFrames can be saved as Parquet files, maintaining the schema information.\n",
    "spark_df.write.parquet('http.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Have Spark read in the Parquet File\n",
    "spark_df = spark.read.parquet(\"http.parquet\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/compressed.jpeg\" width=\"300px\"></div>\n",
    "\n",
    "# Parquet files are compressed\n",
    "Here we see the first benefit of Parquet which stores data with compressed columnar format. There are several compression options available (including uncompressed).\n",
    "\n",
    "## Original http.log = 1.3 GB \n",
    "## http.parquet = ~100 MB (multi-file)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/fast.jpg\" width=\"350px\"></div>\n",
    "\n",
    "# Did we mention fast?\n",
    "The query below was executed on 4 workers. The data contains over 2 million HTTP requests/responses and the time to complete was a **fraction of a second** running on my Mac Laptop :)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------+-------+\n",
      "| method|status_code|  count|\n",
      "+-------+-----------+-------+\n",
      "|   HEAD|        404|1294022|\n",
      "|    GET|        404| 429283|\n",
      "|   POST|        200| 125638|\n",
      "|    GET|        200|  88631|\n",
      "|   POST|          0|  32918|\n",
      "|    GET|        400|  29152|\n",
      "|    GET|        303|  10858|\n",
      "|    GET|        403|   8530|\n",
      "|   POST|        404|   4277|\n",
      "|    GET|        304|   3851|\n",
      "|    GET|        302|   3250|\n",
      "|    GET|          0|   2906|\n",
      "|    GET|        401|   2159|\n",
      "|OPTIONS|        200|   1897|\n",
      "|   POST|        302|   1226|\n",
      "|   HEAD|        503|   1010|\n",
      "|   POST|        206|    869|\n",
      "|    GET|        301|    642|\n",
      "|   HEAD|          0|    606|\n",
      "|    GET|        503|    550|\n",
      "+-------+-----------+-------+\n",
      "only showing top 20 rows\n",
      "\n",
      "CPU times: user 3.05 ms, sys: 1.41 ms, total: 4.46 ms\n",
      "Wall time: 369 ms\n"
     ]
    }
   ],
   "source": [
    "%time spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 50px 0px 0px 20px\"><img src=\"images/deep_dive.jpeg\" width=\"350px\"></div>\n",
    "\n",
    "# Data looks good, lets take a deeper dive\n",
    "Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've got the data loaded into a Spark Dataframe we're going to utilize Spark SQL commands to do some investigation and clustering using the Spark MLLib. For this deeper dive we're going to go to another notebook :)\n",
    "\n",
    "### Spark Clustering Notebook\n",
    "- [Zeek Spark Clustering](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Spark_Clustering.ipynb)\n",
    "\n",
    "<div style=\"float: left; margin: 0px 0px 0px 0px\"><img src=\"images/spark_sql.jpg\" width=\"150px\"></div>\n",
    "<div style=\"float: left; margin: -20px 50px 0px 0px\"><img src=\"images/mllib.png\" width=\"150px\"></div>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img align=\"right\" style=\"padding:20px\" src=\"images/SCP_med.png\" width=\"180\">\n",
    "\n",
    "## Wrap Up\n",
    "Well that's it for this notebook, we went from a Zeek log to a high performance Parquet file and then did some digging with high speed, parallel SQL and groupby operations.\n",
    "\n",
    "If you liked this notebook please visit the [ZAT](https://github.com/SuperCowPowers/zat) project for more notebooks and examples.\n",
    "\n",
    "## About SuperCowPowers\n",
    "The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. <a href=\"https://www.supercowpowers.com\" target=\"_blank\">Visit SuperCowPowers</a>"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
